/*
 *    Copyright 2022 The DSMS Authors.
 *
 *    Licensed under the Apache License, Version 2.0 (the "License");
 *    you may not use this file except in compliance with the License.
 *    You may obtain a copy of the License at
 *
 *        http://www.apache.org/licenses/LICENSE-2.0
 *
 *    Unless required by applicable law or agreed to in writing, software
 *    distributed under the License is distributed on an "AS IS" BASIS,
 *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *    See the License for the specific language governing permissions and
 *    limitations under the License.
 */

package com.dsms.modules.rbd.service.impl;

import cn.hutool.core.text.CharSequenceUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.crypto.SecureUtil;
import cn.hutool.json.JSONUtil;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.ceph.rados.IoCTX;
import com.ceph.rados.Rados;
import com.ceph.rados.exceptions.ErrorCode;
import com.ceph.rados.exceptions.RadosException;
import com.ceph.rbd.RbdException;
import com.ceph.rbd.RbdImage;
import com.ceph.rbd.jna.RbdImageInfo;
import com.dsms.common.constant.ResultCode;
import com.dsms.common.constant.SystemConst;
import com.dsms.common.constant.TaskStatusEnum;
import com.dsms.common.constant.TaskTypeEnum;
import com.dsms.common.exception.DsmsEngineException;
import com.dsms.common.model.PageParam;
import com.dsms.common.model.Result;
import com.dsms.common.taskmanager.TaskContext;
import com.dsms.common.taskmanager.model.AsyncTask;
import com.dsms.common.taskmanager.model.Task;
import com.dsms.common.taskmanager.service.ITaskService;
import com.dsms.common.util.ByteUtil;
import com.dsms.common.util.FilterUtil;
import com.dsms.common.util.ObjectUtil;
import com.dsms.common.util.PageUtil;
import com.dsms.dfsbroker.rbd.model.Rbd;
import com.dsms.dfsbroker.rbd.model.dto.RbdCreateDTO;
import com.dsms.dfsbroker.rbd.model.dto.RbdDeleteDTO;
import com.dsms.dfsbroker.rbd.model.dto.RbdGetDTO;
import com.dsms.dfsbroker.rbd.model.dto.RbdUpdateDTO;
import com.dsms.dfsbroker.rbd.service.IRbdService;
import com.dsms.dfsbroker.rbd.service.RbdWrapper;
import com.dsms.dfsbroker.storagepool.model.StoragePool;
import com.dsms.dfsbroker.storagepool.service.IStoragePoolService;
import com.dsms.modules.rbd.model.vo.RbdListVO;
import com.dsms.modules.util.RadosSingleton;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import org.zeroturnaround.zip.commons.IOUtils;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.stream.Collectors;

@Service
@Slf4j
public class RbdServiceImpl implements IRbdService {

    private final ITaskService taskService;
    private final IStoragePoolService storagePoolService;
    private final TaskContext taskContext;

    private static final String TASK_ALREADY_EXIST = "Task already exist,task message: {}";

    private static final String POOL_NOT_AVAILABLE= "pool not available,pool name: {}";

    public RbdServiceImpl(ITaskService taskService, IStoragePoolService storagePoolService, TaskContext taskContext) {
        this.taskService = taskService;
        this.storagePoolService = storagePoolService;
        this.taskContext = taskContext;
    }

    @Override
    public Rbd get(RbdGetDTO rbdGetDTO) {
        if (ObjectUtil.checkObjFieldContainNull(rbdGetDTO)) {
            throw new RuntimeException("rbdGetDTO can not be null or empty");
        }
        if (!storagePoolService.isPoolAvailable(rbdGetDTO.getPoolName())) {
            return null;
        }
        Rados rados = RadosSingleton.INSTANCE.getRados();
        try (IoCTX ioctx = rados.ioCtxCreate(rbdGetDTO.getPoolName());
             RbdImage rbdImage = new com.ceph.rbd.Rbd(ioctx).open(rbdGetDTO.getRbdName())) {
            RbdWrapper rbdWrapper = new RbdWrapper(ioctx);
            int dataPoolId = rbdWrapper.getDataPoolId(rbdGetDTO.getRbdName());
            //By default, it is consistent with the metadata pool. If it is created based on the erasure code pool, query the data pool
            String dataPoolName = rbdGetDTO.getPoolName();
            if (dataPoolId != 0) {
                dataPoolName = rados.poolReverseLookup(dataPoolId);
            }

            RbdImageInfo imageInfo = rbdImage.stat();
            //The storage volume created based on the erasure code pool needs to obtain the storage object in the data pool
            List<String> objects;
            if (Objects.equals(dataPoolName, rbdGetDTO.getPoolName())) {
                objects = List.of(ioctx.listObjects());
            } else {
                try (IoCTX dataIoctx = rados.ioCtxCreate(dataPoolName);
                ) {
                    objects = List.of(dataIoctx.listObjects());
                }
            }
            List<String> dataObj = objects.parallelStream().filter(obj -> obj.startsWith(new String(imageInfo.block_name_prefix, StandardCharsets.UTF_8).trim())).collect(Collectors.toList());

            Long usedSize = imageInfo.obj_size * dataObj.size();
            return new Rbd(imageInfo.order, rbdGetDTO.getRbdName(), rbdGetDTO.getPoolName(), dataPoolName, usedSize, imageInfo.size);
        } catch (RbdException e) {
            if (Objects.equals(e.getReturnValue(), ErrorCode.ENOENT.getErrorCode())) {
                log.warn(ErrorCode.getErrorMessage(e.getReturnValue()), e);
                return null;
            }
            throw new RuntimeException(ErrorCode.getErrorMessage(e.getReturnValue()), e);
        } catch (IOException e) {
            throw new DsmsEngineException(e, ResultCode.RBD_GETRBDINFO_ERROR);
        }
    }

    @Override
    public Page<Rbd> list(PageParam pageParam) {
        List<Rbd> rbds = new ArrayList<>();
        Rados rados = RadosSingleton.INSTANCE.getRados();
        try {
            for (String poolName : rados.poolList()) {
                if (!storagePoolService.isPoolAvailable(poolName)) {
                    continue;
                }
                try ( //get storage pool ioctx
                      IoCTX ioctx = rados.ioCtxCreate(poolName)) {
                    //get storage pool's rbd name list
                    com.ceph.rbd.Rbd rbd = new com.ceph.rbd.Rbd(ioctx);
                    for (String rbdName : rbd.list()) {
                        RbdGetDTO rbdGetDTO = new RbdGetDTO(poolName, rbdName);
                        Rbd resultRbd = get(rbdGetDTO);
                        if (!Objects.isNull(resultRbd)) {
                            rbds.add(resultRbd);
                        }
                    }
                } catch (RbdException e) {
                    throw new RuntimeException(ErrorCode.getErrorMessage(e.getReturnValue()));
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        } catch (RadosException e) {
            throw new RuntimeException(ErrorCode.getErrorMessage(e.getReturnValue()));
        }

        //page and filter the rbdList by poolName and rbdName
        return queryAndPage(rbds, pageParam);
    }

    private Page<Rbd> queryAndPage(List<Rbd> rbds, PageParam pageParam) {
        if (pageParam instanceof RbdListVO) {
            RbdListVO rbdListVO = (RbdListVO) pageParam;
            rbds = rbds.stream()
                    .filter(rbd -> FilterUtil.fuzzy(rbdListVO.getRbdName(), rbd.getRbdName()) && FilterUtil.matches(rbdListVO.getPoolName(), rbd.getPoolName()))
                    .collect(Collectors.toList());
        }
        return PageUtil.getPageData(rbds, pageParam.getPageNo(), pageParam.getPageSize());
    }

    @Override
    public boolean create(RbdCreateDTO rbdCreateDTO) {
        if (ObjectUtils.isEmpty(rbdCreateDTO)) {
            log.error("rbdCreateDTO can not be empty");
            return false;
        }
        if (!storagePoolService.isPoolAvailable(rbdCreateDTO.getPoolName())) {
            log.error(POOL_NOT_AVAILABLE, rbdCreateDTO.getPoolName());
            throw new DsmsEngineException(ResultCode.POOL_UNAVAILABLE_ERROR);
        }

        //not allowed to create rbd witch's capacity exceeds the limit
        StoragePool metaPool = storagePoolService.get(rbdCreateDTO.getPoolName());
        StoragePool dataPool = metaPool;
        if (rbdCreateDTO.getBasedOnEra()) {
            if (CharSequenceUtil.isBlank(rbdCreateDTO.getDataPoolName())) {
                throw DsmsEngineException.exceptionWithMessage("数据池名称不能为空)", ResultCode.RBD_CREATE_TASK_ERROR);
            }
            dataPool = storagePoolService.get(rbdCreateDTO.getDataPoolName());
            if (metaPool.getRbdAvailableCapacity() < Rbd.RBD_META_DATA_BYTES) {
                throw DsmsEngineException.exceptionWithMessage("元数据池可分配容量不可小于存储卷元数据需要容量(100M)", ResultCode.RBD_CREATE_TASK_ERROR);
            }
            if (ByteUtil.gbToBytes(rbdCreateDTO.getRbdSize()) > dataPool.getRbdAvailableCapacity()) {
                throw DsmsEngineException.exceptionWithMessage(ResultCode.RBD_CREATE_TASK_ERROR_EXCEEDS_POOL_CAPACITY.getMessage(), ResultCode.RBD_CREATE_TASK_ERROR);
            }
        } else {
            if (ByteUtil.gbToBytes(rbdCreateDTO.getRbdSize()) > metaPool.getRbdAvailableCapacity()) {
                throw DsmsEngineException.exceptionWithMessage(ResultCode.RBD_CREATE_TASK_ERROR_EXCEEDS_POOL_CAPACITY.getMessage(), ResultCode.RBD_CREATE_TASK_ERROR);
            }
        }

        String taskMessage = TaskTypeEnum.CREATE_RBD.getName() + ":" + rbdCreateDTO.getRbdName() + ",存储池:" + rbdCreateDTO.getPoolName();
        if (!taskContext.validateTask(TaskTypeEnum.CREATE_RBD, taskMessage)) {
            log.warn(TASK_ALREADY_EXIST, taskMessage);
            throw new DsmsEngineException(ResultCode.TASK_EXIST);
        }
        Task task = new AsyncTask(
                TaskTypeEnum.CREATE_RBD.getName(),
                TaskTypeEnum.CREATE_RBD.getType(),
                TaskStatusEnum.QUEUE.getStatus(),
                taskMessage,
                JSONUtil.toJsonStr(rbdCreateDTO));

        return taskService.save(task);
    }

    @Override
    public boolean delete(RbdDeleteDTO rbdDeleteDTO) {
        if (ObjectUtils.isEmpty(rbdDeleteDTO)) {
            log.error("rbdDeleteDTO can not be empty");
            return false;
        }
        if (!storagePoolService.isPoolAvailable(rbdDeleteDTO.getPoolName())) {
            log.error(POOL_NOT_AVAILABLE, rbdDeleteDTO.getPoolName());
            throw new DsmsEngineException(ResultCode.POOL_UNAVAILABLE_ERROR);
        }

        String taskMessage = TaskTypeEnum.DELETE_RBD.getName() + ":" + rbdDeleteDTO.getRbdName() + ",存储池:" + rbdDeleteDTO.getPoolName();
        if (!taskContext.validateTask(TaskTypeEnum.DELETE_RBD, taskMessage)) {
            log.warn(TASK_ALREADY_EXIST, taskMessage);
            throw new DsmsEngineException(ResultCode.TASK_EXIST);
        }
        Task task = new AsyncTask(
                TaskTypeEnum.DELETE_RBD.getName(),
                TaskTypeEnum.DELETE_RBD.getType(),
                TaskStatusEnum.QUEUE.getStatus(),
                taskMessage,
                JSONUtil.toJsonStr(rbdDeleteDTO));

        return taskService.save(task);
    }

    @Override
    public boolean update(RbdUpdateDTO rbdUpdateDTO) {
        if (ObjectUtils.isEmpty(rbdUpdateDTO)) {
            log.error("rbdUpdateDTO can not be empty");
            return false;
        }
        if (!storagePoolService.isPoolAvailable(rbdUpdateDTO.getPoolName())) {
            log.error(POOL_NOT_AVAILABLE, rbdUpdateDTO.getPoolName());
            throw new DsmsEngineException(ResultCode.POOL_UNAVAILABLE_ERROR);
        }
        //The storage volume created based on the erasure code pool needs to calculate the capacity of the data pool
        StoragePool storagePool;
        Rbd rbd = get(new RbdGetDTO(rbdUpdateDTO.getPoolName(), rbdUpdateDTO.getRbdName()));
        if (Objects.isNull(rbd)) {
            throw DsmsEngineException.exceptionWithMessage("存储卷不存在", ResultCode.RBD_CREATE_TASK_ERROR);
        }
        if (Objects.equals(rbd.getPoolName(), rbd.getDataPoolName())) {
            storagePool = storagePoolService.get(rbd.getPoolName());
        } else {
            storagePool = storagePoolService.get(rbd.getDataPoolName());
        }
        Rados rados = RadosSingleton.INSTANCE.getRados();
        try (IoCTX ioctx = rados.ioCtxCreate(rbdUpdateDTO.getPoolName());
             RbdImage rbdImage = new com.ceph.rbd.Rbd(ioctx).open(rbdUpdateDTO.getRbdName())) {
            long currentSize = rbdImage.stat().size;
            long rbdAvailableCapacity = storagePool.getRbdAvailableCapacity();
            if (ByteUtil.gbToBytes(rbdUpdateDTO.getRbdSize()) - currentSize > rbdAvailableCapacity) {
                throw DsmsEngineException.exceptionWithMessage(ResultCode.RBD_CREATE_TASK_ERROR_EXCEEDS_POOL_CAPACITY.getMessage(), ResultCode.RBD_CREATE_TASK_ERROR);
            }

            if (ByteUtil.gbToBytes(rbdUpdateDTO.getRbdSize()) <= rbd.getUsedSize()) {
                throw DsmsEngineException.exceptionWithMessage("存储卷容量不可小于已使用量", ResultCode.RBD_CREATE_TASK_ERROR);
            }
        } catch (DsmsEngineException e) {
            log.error(e.getMessage(), e);
            throw e;
        } catch (IOException e) {
            log.error(e.getMessage(), e);
            throw new RuntimeException(e);
        }

        String taskMessage = TaskTypeEnum.UPDATE_RBD.getName() + ":" + rbdUpdateDTO.getRbdName() + "存储池:" + rbdUpdateDTO.getPoolName() + ",容量:" + rbdUpdateDTO.getRbdSize() + "G";
        if (!taskContext.validateTask(TaskTypeEnum.UPDATE_RBD, taskMessage)) {
            log.warn(TASK_ALREADY_EXIST, taskMessage);
            throw new DsmsEngineException(ResultCode.TASK_EXIST);
        }
        Task task = new AsyncTask(
                TaskTypeEnum.UPDATE_RBD.getName(),
                TaskTypeEnum.UPDATE_RBD.getType(),
                TaskStatusEnum.QUEUE.getStatus(),
                taskMessage,
                JSONUtil.toJsonStr(rbdUpdateDTO));

        return taskService.save(task);
    }

    @Override
    public Result<Object> downloadKey() {
        File file = new File(SystemConst.ADMIN_KEYRING_PATH);
        if (file.isFile() && file.exists()) {
            try (FileInputStream fit = new FileInputStream(file)) {
                byte[] byteArray = IOUtils.toByteArray(fit);
                String md5 = SecureUtil.md5(file);

                Map<String, Object> map = new HashMap<>();
                map.put(SystemConst.ADMIN_KEYRING_NAME, byteArray);
                map.put("md5", md5);

                return Result.OK(map);
            } catch (Exception e) {
                log.error(e.getMessage());
                return Result.error(e.getMessage());
            }
        } else {
            log.error("File does not exist");
            return Result.error("File does not exist");
        }
    }
}
